Predictability of Kullback-Leibler (KL) Divergence#

Introduction#

Streamflow prediction in ungauged basins has classically focused on minimizing one or more loss functions, typically square error, NSE, and KGE, between observed (daily streamflow) values and predictions generated by some type of model, whether it be an empirical regionalization, statistical machine learning, or process-based rainfall runoff model. Regionalization and machine learning models depend upon an existing streamflow monitoring network for PUB, and the performance of these models is linked to the existing network’s representativeness of the ungauged space. This link raises the question how varying the network arrangement affects the performance of PUB models overall in terms of an expectation of prediction error, that is prediction error across all ungauged locations, and by extension whether there exist environmental signals orthogonal to streamflow with sufficient information to discriminate between network arrangements to minimize the expectation of prediction error over the ungagued space.

A simple interpretation of the loss functions commonly used in the PUB literature might be “how close are mean daily streamflow predictions to observed values?” A much simpler question to ask of observational data is: “will a given model outperform random guessing in the long run?”. This binary question represents a starting point to approach the optimal streamflow monitoring network problem. The justification for asking such a basic question is that an expectation of the uncertainty reduction over the unmonitored space provided by a given monitoring arrangement supports a discriminant function to compare unique arrangements. In addition, more complex questions quickly become intractable given that the number of comparisons grows exponentially with the size of the decision space (possible monitoring locations). A simple, informative problem formulation is then possible to test on real data, in this case an ungauged space of over 1 million ungauged catchments and a set of over 1600 monitored catchments with which to train a model.

The binary prediction problem is followed by a regresson problem where the goal is to minimize the expectation of prediction error based on the Kullback-Leibler divergence \(D_{KL}\), (a surrogate loss function from the class of information discriminant measures which is consistent with the exponential loss [Nguyen et al., 2009]).

Problem Formulation#

Binary Prediction Problem#

The first question “is a given model better than random guessing in the long run” is formulated into a binary classification problem as follows:

  1. The streamflow prediction model assumes that discharge at one location is equal to that a different lcoation on a unit area basis, commonly referred to as an equal unit area runoff (UAR) model.

  2. Given the equal UAR model, an observed (proxy) catchment is a potential model for predicting the distribution of UAR for any unobserved (target) location,

  3. A proxy is “informative” to a target if the proxy UAR is closer to the posterior target UAR than the maximum uncertainty (uniform distribution) prior.

  4. A proxy is “disinformative” to a target if the maximum uncertainty (uniform distribution) prior is closer to the posterior target UAR than the proxy UAR.

  5. The “closeness” of distributions is characterized by three measures from the general class of f-divergences, namely the total variation distance (TVD), the Kullback-Leibler divergence (\(D_{KL}\)), and the earth mover’s distance (EMD), also known as the Wasserstein distance.

    For the Kullback-Leibler divergence \(D_{KL}\):

  6. The (posterior) target UAR distribution is denoted by \(P\), and the proxy UAR distribution (model) is denoted by \(Q\).

  7. A proxy model is informative for some target location if \(D_{KL}(P||\mathbb{U}) - D_{KL}(P||Q) > 0\)

    The binary problem formulation is then:

  8. The discriminant function maps the difference in the two divergences to a binary outcome corresponding to the sign of the resulting quantity \(Y = +1 \text{ if } D_{KL}(P||\mathbb{U}) - D_{KL}(P||Q) > 0 \text{ or } -1 \text{ otherwise}\)

  9. The goal is to miminize the probability of incorrect predictions, defined by the (Bayes) error \(R_{\textbf{Bayes}}(\gamma, C) := \mathbb{P} \left(Y \neq \text{sign}(D_{KL}(\mathbf{P}||\mathbb{U}) - D_{KL}(\mathbf{P}||\mathbf{Q}) \right)\)

Regression Problem#

The same problem setup applies for the regression prediction problem which is to optimize the discriminant function and the input signal quantization simultaneously to minimize the error in predicting the KL divergence from catchment attributes.

Instead of predicting a scalar measure which is a feature of a single location, the key difference in this step is the target variable describes a measure of the difference in runoff between pairs of locations. This approach asks whether the Kullback-Leibler Divergence (KLD) of the distribution of unit area runoff between two locations can be predicted from the attributes of both catchments (and their differences) using the gradient boosted decision tree method, which is also capable of predicting continuous variables, in this case \(D_{KL}\).

import os
import pandas as pd
import numpy as np
from time import time

from bokeh.plotting import figure, show
from bokeh.layouts import gridplot, row, column
from bokeh.models import ColumnDataSource
from bokeh.io import output_notebook
from bokeh.palettes import Sunset10, Vibrant7

import xgboost as xgb
xgb.config_context(verbosity=2)

from sklearn.metrics import (
    root_mean_squared_error,
    mean_absolute_error,
    roc_auc_score,
    accuracy_score,
)

import data_processing_functions as dpf

from scipy.stats import linregress
output_notebook()

BASE_DIR = os.getcwd()
Loading BokehJS ...

Load attribute data#

# load the catchment characteristics
fname = 'BCUB_watershed_attributes_updated.csv'
attr_df = pd.read_csv(os.path.join('data', fname))
attr_df.columns = [c.lower() for c in attr_df.columns]
station_ids = attr_df['official_id'].values
print(f'There are {len(station_ids)} monitored basins in the attribute set.')
There are 1609 monitored basins in the attribute set.

Load pairwise attribute comparisons#

Load a few rows from one of the pairwise data files. These contain attributes about divergence measures that are computed on concurrent and non-concurrent time series at two monitored locations.

# open an example pairwise results file
input_folder = os.path.join(
    BASE_DIR, "data", "processed_divergence_inputs",
)
pairs_files = os.listdir(input_folder)
test_df = pd.read_csv(os.path.join(input_folder, pairs_files[0]), nrows=1000)
kld_columns = [c for c in test_df.columns if 'dkl' in c]
kld_columns
['dkl_concurrent_uniform',
 'dkl_concurrent_post_-5R',
 'dkl_concurrent_post_-4R',
 'dkl_concurrent_post_-3R',
 'dkl_concurrent_post_-2R',
 'dkl_concurrent_post_-1R',
 'dkl_concurrent_post_0R',
 'dkl_concurrent_post_1R',
 'dkl_concurrent_post_2R',
 'dkl_concurrent_post_3R',
 'dkl_concurrent_post_4R',
 'dkl_concurrent_post_5R',
 'dkl_concurrent_post_6R',
 'dkl_concurrent_post_7R',
 'dkl_concurrent_post_8R',
 'dkl_concurrent_post_9R',
 'dkl_concurrent_post_10R',
 'dkl_nonconcurrent_uniform',
 'dkl_nonconcurrent_post_-5R',
 'dkl_nonconcurrent_post_-4R',
 'dkl_nonconcurrent_post_-3R',
 'dkl_nonconcurrent_post_-2R',
 'dkl_nonconcurrent_post_-1R',
 'dkl_nonconcurrent_post_0R',
 'dkl_nonconcurrent_post_1R',
 'dkl_nonconcurrent_post_2R',
 'dkl_nonconcurrent_post_3R',
 'dkl_nonconcurrent_post_4R',
 'dkl_nonconcurrent_post_5R',
 'dkl_nonconcurrent_post_6R',
 'dkl_nonconcurrent_post_7R',
 'dkl_nonconcurrent_post_8R',
 'dkl_nonconcurrent_post_9R',
 'dkl_nonconcurrent_post_10R']

Define attribute groupings#

terrain = ['drainage_area_km2', 'elevation_m', 'slope_deg', 'aspect_deg'] #'gravelius', 'perimeter',
land_cover = [
    'land_use_forest_frac_2010', 'land_use_grass_frac_2010', 'land_use_wetland_frac_2010', 'land_use_water_frac_2010', 
    'land_use_urban_frac_2010', 'land_use_shrubs_frac_2010', 'land_use_crops_frac_2010', 'land_use_snow_ice_frac_2010']
soil = ['logk_ice_x100', 'porosity_x100']
climate = ['prcp', 'srad', 'swe', 'tmax', 'tmin', 'vp', 'high_prcp_freq', 'high_prcp_duration', 'low_prcp_freq', 'low_prcp_duration']
all_attributes = terrain + land_cover + soil + climate
len(all_attributes)
24

Set trial parameters#

# define the amount of data to set aside for final testing
holdout_pct = 0.10
nfolds = 5
n_boost_rounds = 2500
n_optimization_rounds = 20

#define if testing concurrent or nonconcurrent data
concurrent = 'concurrent'

# the input data file has an associated revision date
revision_date = '20240812'

all_test_results = {}
attribute_set_names = ['climate', '+land_cover', '+terrain', '+soil']

results_folder = os.path.join(BASE_DIR, 'data', 'kld_prediction_results')
if not os.path.exists(results_folder):
    os.makedirs(results_folder)
attr_df.columns
Index(['region', 'official_id', 'drainage_area_km2', 'centroid_lon_deg_e',
       'centroid_lat_deg_n', 'logk_ice_x100', 'porosity_x100',
       'land_use_forest_frac_2010', 'land_use_shrubs_frac_2010',
       'land_use_grass_frac_2010', 'land_use_wetland_frac_2010',
       'land_use_crops_frac_2010', 'land_use_urban_frac_2010',
       'land_use_water_frac_2010', 'land_use_snow_ice_frac_2010',
       'lulc_check_2010', 'land_use_forest_frac_2015',
       'land_use_shrubs_frac_2015', 'land_use_grass_frac_2015',
       'land_use_wetland_frac_2015', 'land_use_crops_frac_2015',
       'land_use_urban_frac_2015', 'land_use_water_frac_2015',
       'land_use_snow_ice_frac_2015', 'lulc_check_2015',
       'land_use_forest_frac_2020', 'land_use_shrubs_frac_2020',
       'land_use_grass_frac_2020', 'land_use_wetland_frac_2020',
       'land_use_crops_frac_2020', 'land_use_urban_frac_2020',
       'land_use_water_frac_2020', 'land_use_snow_ice_frac_2020',
       'lulc_check_2020', 'slope_deg', 'aspect_deg', 'median_el', 'mean_el',
       'max_el', 'min_el', 'elevation_m', 'prcp', 'tmin', 'tmax', 'vp', 'swe',
       'srad', 'low_prcp_duration', 'low_prcp_freq', 'high_prcp_duration',
       'high_prcp_freq', 'mean_runoff'],
      dtype='object')
def add_attributes(attr_df, df_relations, attribute_cols):
    """
    Adds attributes from the df_attributes to the df_relations based on the 'proxy' and 'target' columns
    using map for efficient lookups.

    Parameters:
    df_attributes (pd.DataFrame): DataFrame with 'id' and attribute columns.
    df_relations (pd.DataFrame): DataFrame with 'proxy' and 'target' columns.
    attribute_cols (list of str): List of attribute columns to add to df_relations.

    Returns:
    pd.DataFrame: Updated df_relations with added attribute columns.
    """
    # Create dictionaries for each attribute for quick lookup
    attr_dicts = {col: attr_df.set_index('official_id')[col].to_dict() for col in attribute_cols}

    # Add target attributes
    for col in attribute_cols:
        df_relations[f'target_{col}'] = df_relations['target'].map(attr_dicts[col])

    # Add proxy attributes
    for col in attribute_cols:
        df_relations[f'proxy_{col}'] = df_relations['proxy'].map(attr_dicts[col])

    return df_relations
def predict_KLD_from_attributes(attr_df, target_col, holdout_pct, stations, nfolds, results_folder, partial_counts=False):

    training_stn_cv_sets, test_stn_sets = dpf.train_test_split_by_official_id(holdout_pct, stations, nfolds)
    all_test_results = {}
    for bitrate in [4, 6, 8, 9, 10, 11, 12]:
        t0 = time()
        print(f'bitrate = {bitrate}')
        fname = f"KL_results_{bitrate}bits_{revision_date}.csv"
        if partial_counts:
            fname = f"KL_results_{bitrate}bits_{revision_date}_partial_counts.csv"

        input_data_fpath = os.path.join(input_folder, fname)
        nrows = None
        df = pd.read_csv(input_data_fpath, nrows=nrows, low_memory=False)
        df.dropna(subset=[target_col], inplace=True)
        t1 = time()
        print(f'    {t1-t0:.2f}s to load input data')

        # add the attributes into the input dataset
        df = add_attributes(attr_df, df, all_attributes)
        
        all_test_results[bitrate] = {}
        input_attributes = []

        # add attribute groups successively
        for attribute_set, set_name in zip([land_cover, terrain, soil, climate], attribute_set_names):
            print(f'  Processing {set_name} attribute set: {target_col}')
            input_attributes += attribute_set 
                        
            features = dpf.format_features(input_attributes)
            
            trial_df, test_df = dpf.run_xgb_trials_custom_CV(
                bitrate, set_name, features, target_col, df, 
                training_stn_cv_sets, test_stn_sets, n_optimization_rounds, 
                nfolds, n_boost_rounds, results_folder
            )
            
            test_rmse = root_mean_squared_error(test_df['actual'], test_df['predicted'])
            test_mae = mean_absolute_error(test_df['actual'], test_df['predicted'])

            print(f'   held-out test rmse: {test_rmse:.2f}, mae: {test_mae:.2f}')
            print('')
            # store the test set predictions and actuals
            all_test_results[bitrate][set_name] = {
                'trials': trial_df, 'test_df': test_df,
                'test_mae': test_mae, 'test_rmse': test_rmse} 
    return all_test_results

Run Models#

priors_to_test = [-2, -1, 0, 1, 2]

for prior in priors_to_test:
    target_col = f'dkl_{concurrent}_post_{prior}R'
    test_results_fname = f'{target_col}_{prior}_prior_results.npy'
    test_results_fpath = os.path.join(results_folder, test_results_fname)
    if os.path.exists(test_results_fpath):
        all_test_results = np.load(test_results_fpath, allow_pickle=True).item()
    else:
        all_test_results = predict_KLD_from_attributes(attr_df, target_col, holdout_pct, station_ids, nfolds, results_folder)
        np.save(test_results_fpath, all_test_results)
def load_result_by_prior(prior):
    fname = f'dkl_{concurrent}_post_{prior}R_{prior}_prior_results.npy'
    fpath = os.path.join(results_folder, fname)
    return np.load(fpath, allow_pickle=True).item()
layout_dict = {}
reg_plots_dict = {}
res_r2_dict = {}
for prior in priors_to_test:
    plots = []
    result = load_result_by_prior(prior)
    reg_plots_dict[prior] = {}
    res_r2_dict[prior] = {}
    for b, set_dict in result.items():
        test_rmse, test_mae = [], []
        attribute_sets = list(set_dict.keys())
    
        y1 = [set_dict[e]['test_rmse'] for e in attribute_sets]
        y2 = [set_dict[e]['test_mae'] for e in attribute_sets]
        
        source = ColumnDataSource({'x': attribute_sets, 'y1': y1, 'y2': y2})
        
        title = f'{b} bits (Q(θ|D)∼Dirichlet(α10^{prior}_K))'
        if len(plots) == 0:
            fig = figure(title=title, x_range=attribute_sets)
        else:
            fig = figure(title=title, x_range=attribute_sets, y_range=plots[0].y_range)
        fig.line('x', 'y1', legend_label='rmse', color='green', source=source, line_width=3)
        fig.line('x', 'y2', legend_label='mae', color='dodgerblue', source=source, line_width=3)
        fig.legend.background_fill_alpha = 0.6
        fig.yaxis.axis_label = 'RMSE'
        fig.xaxis.axis_label = 'Attribute Group (additive)'
        
        result_df = pd.DataFrame({'set': attribute_sets, 'rmse': y1, 'mae': y2})
        best_rmse_idx = result_df['rmse'].idxmin()
        best_mae_idx = result_df['mae'].idxmin()
        best_rmse_set = result_df.loc[best_rmse_idx, 'set']
        best_mae_set = result_df.loc[best_mae_idx, 'set']
        best_result = set_dict[best_rmse_set]['test_df']
        
        xx, yy = best_result['actual'], best_result['predicted']
        slope, intercept, r, p, se = linregress(xx, yy)
        
        sfig = figure(title=f'Test: {b} bits best model {best_rmse_set} (N={len(best_result)})')
        sfig.scatter(xx, yy, size=1, alpha=0.6)
        xpred = np.linspace(min(xx), max(xx), 100)
        ybf = [slope * e + intercept for e in xpred]
        sfig.line(xpred, ybf, color='red', line_width=3, line_dash='dashed', legend_label=f'R²={r**2:.2f}')   
        # plot a 1:1 line
        sfig.line([min(yy), max(yy)], [min(yy), max(yy)], color='black', line_dash='dotted', 
                  line_width=2, legend_label='1:1')
        sfig.xaxis.axis_label = r'Actual $$D_{KL}$$ [bits/sample]'
        sfig.yaxis.axis_label = r'Predicted $$D_{KL}$$ [bits/sample]'
        sfig.legend.location = 'top_left'
        reg_plots_dict[prior][b] = sfig
        res_r2_dict[prior][b] = r**2
        plots.append(fig)
        plots.append(sfig)
    layout_dict[prior] = gridplot(plots, ncols=2, width=350, height=300)
show(layout_dict[-2])
show(layout_dict[-1])
show(layout_dict[-0])
show(layout_dict[1])
show(layout_dict[2])
col_figs = []
for prior in [-2, -1, 0, 1, 2]:
    col = column([reg_plots_dict[prior][b] for b in [4, 6, 8, 9, 10, 11, 12]])
    col_figs.append(col)
        
reg_layout = row(col_figs)
show(reg_layout)
from bokeh.transform import linear_cmap
from bokeh.models import ColorBar, ColumnDataSource
from bokeh.layouts import gridplot
from bokeh.palettes import Viridis256, gray, magma

# Convert the nested dict into a DataFrame
df = pd.DataFrame(res_r2_dict).T  # Transpose to get priors as columns
df.index.name = 'Prior'
df.columns.name = 'Bitrate'

df
Bitrate 4 6 8 9 10 11 12
Prior
-2 0.667416 0.672688 0.715624 0.738329 0.753776 0.753705 0.774256
-1 0.654098 0.634524 0.682851 0.711636 0.717136 0.714581 0.737956
0 0.719002 0.746315 0.754369 0.770585 0.755401 0.747406 0.740944
1 0.671507 0.626959 0.569079 0.520582 0.498875 0.488139 0.475856
2 0.479448 0.342760 0.379661 0.435185 0.447244 0.463187 0.458513
# Melt the DataFrame to a long format
df_melted = df.reset_index().melt(id_vars='Prior', var_name='Bitrate', value_name='Value')
# Ensure the Bitrate values are ordered correctly (increasing order)
df_melted['Bitrate'] = pd.Categorical(df_melted['Bitrate'], categories=sorted(df_melted['Bitrate'].unique(), reverse=False), ordered=True)

# Create a Bokeh ColumnDataSource
source = ColumnDataSource(df_melted)

# Create a figure for the heatmap
p = figure(title="KL divergence from attributes: R² of test set by Prior and Bitrate",width=600, height=500,
           tools="hover", tooltips=[('Value', '@Value')], toolbar_location=None)

# Create a color mapper
mapper = linear_cmap(field_name='Value', palette=magma(256), low=df_melted.Value.min(), high=df_melted.Value.max())

# Add rectangles to the plot
p.rect(x="Prior", y="Bitrate", width=1, height=1, source=source,
       line_color=None, fill_color=mapper)

# Add color bar
color_bar = ColorBar(color_mapper=mapper['transform'], width=8, location=(0,0))
p.add_layout(color_bar, 'right')

# Format plot
p.axis.axis_line_color = None
p.axis.major_tick_line_color = None
p.xaxis.axis_label = r'$$Q(θ|D)∼\text{Dirichlet}(\alpha = 10^{a})$$'
p.yaxis.axis_label = r'$$\text{Quantization Bitrate (dictionary size)}$$'
p.axis.major_label_text_font_size = "10pt"
p.axis.major_label_standoff = 0
p.xaxis.major_label_orientation = 1.0

# Output the plot to an HTML file and display it
# output_file("heatmap.html")
show(p)

Discussion#

Since KL divergence \(D_{KL} = \sum P\log(\frac{P}{Q}) = +\infty \text{ when any } q_i \rightarrow 0\), the simulated \(Q\) is treated as a posterior distribution by assuming a uniform (Dirichlet) prior \(\alpha = [a_1, \dots, a_n]\). The prior \(\alpha\) is an additive array of uniform pseudo-counts used to address the (commonly occurring) case where \(q_i = 0\), that is the model does not predict an observed state \(i\). In this experiment we tested a wide range of priors on an exponential scale, \(\alpha = 10^a, a \in [-2, -1, 0, 1, 2]\).

The scale of the pseudo-count can be interpreted as a strength of belief in the model. Small \(a\) represents strong belief that the model produces a distribution \(Q\) that is representative of the “true” (observed posterior) distribution \(P\), and for a fixed \(p_i\) the effect of a decreasing \(a\) on the discriminant function \(D_{KL}\) yields a stronger penalty for a model that fails to predict an observed state. A large \(a\) represents weak belief that the model produces a distribution \(Q\) that is representative of \(P\), since \(Q\) approaches the uniform distribution \(\mathbb{U}\) as \(a\) increases.

Adding pseudo-counts has the effect of diluting the signal for the gradient boosting model to exploit in minimizing prediction error. Analogously, varying the bitrate, or the size of the dictionary used to quantize continuous streamflow into discrete states, also adds quantization noise since the original streamflow signals are stored in three decimal precision and they are quantized into as few as 4 bits (16 symbol dictionary) and as many as 12 bits (4096 symbol dictionary). The range of dictionary sizes is set to cover the expected range of rating curve uncertainty, which is generally considered multiplicative and expressed as a % of the observed value.

As shown by the results, priors representing the addition of \(10^1 \text{ to } 10^2\) pseudo-counts diminishes the performance of the gradient boosted decision tree model, regardless of the dictionary size, or the number of possible values provided by the quantization. Heavily penalizing unpredicted states does not have as great an impact as anticipated, perhaps as a result of the corresponding \(p_i\) values also being small.

Citations#

[NWJ09]

missing journal in nguyen2009surrogate